<?php
/**
 * MQ消息接收-芒果注入主媒资分集修改
 * Created by <xinxin.deng>.
 * Author: xinxin.deng
 * Date: 2018/7/30 14:50
 */
include_once dirname(dirname(dirname(dirname(dirname(__FILE__)))))."/common.php";
\ns_core\m_load::load("ns_core.m_config");
\ns_core\m_load::load("ns_model.message.message_queue");
\ns_core\m_load::load("ns_model.message.message_explain");
\ns_core\m_load::load_old('nn_logic/nl_common.func.php') ;
\ns_core\m_load::load_old('nn_logic/cp/cp.class.php');
\ns_core\m_load::load_old('nn_logic/nl_log_v2.func.php');

define("IS_LOG_TIMER_OPERATION", true);
define('ORG_ID', 'amqp_import');

class amqp_import extends ns_model\message\message_queue
{
    private $arr_host_config  = array();
    private $str_cp_id = '';
    /**
     * 注入消息队列模板
     * @var unknown
     */
    public $arr_in_message = array(
        'base_info'=>array(
            'nns_message_time'=>'', //上游消息时间  年月日时分秒毫秒
            'nns_message_id'=>'',  //上游消息ID
            'nns_cp_id'=>'', //上游CP标示
            'nns_message_xml'=>'',  //上游原始的信息  可以为字符串 | xml | json
            'nns_message_content'=>'',  //上游解析后的信息  可以为字符串 | xml | json  生成队列的基本信息
            'nns_action'=>'', //操作 行为
            'nns_type'=>'', //消息 类型
            'nns_name'=>'',  //消息名称
            'nns_package_id'=>'',  //包ID（只对天威用 后期废用）
            'nns_xmlurlqc'=>'', //广州电信悦ME MD5摘要
            'nns_encrypt'=>'', //广州电信悦ME 加密串
            'nns_content_number'=>'1', //xml文件中包含的内容数量
            'nns_message_state'=>0,
        ), //基本信息（存储于nns_mgtvbk_message表中）
    );

    /**
     * amqp_import constructor.
     */
    public function __construct()
    {
        $arr_dir = explode('|', str_replace(array ('/','\\'), '|', __DIR__));
        $this->str_cp_id = array_pop($arr_dir);
        \ns_core\m_load::load_np("np_xml_to_array.class.php");
        m_config::get_dc();
    }

    /**
     * 进行拉取amqp消息队列
     * @return bool|array
     */
    public function import()
    {
        $this->arr_in_message['base_info']['nns_cp_id'] = $this->str_cp_id;
        $arr_cp_config = \m_config::_get_cp_info($this->str_cp_id);
        if ($arr_cp_config['ret'] != 0)
        {
            \m_config::write_message_receive_log($arr_cp_config['reason'], $this->str_cp_id);
            return $arr_cp_config;
        }
        if (empty($arr_cp_config['data_info']))
        {
            \m_config::write_message_receive_log("获取CP[".$this->str_cp_id."]信息为空", $this->str_cp_id);
            return \m_config::return_data(1,"获取CP[".$this->str_cp_id."]信息为空");;
        }

        if(!isset($arr_cp_config['data_info']['nns_config']['message_queue_import_enable']) || $arr_cp_config['data_info']['nns_config']['message_queue_import_enable'] != '1')
        {
            \m_config::write_message_receive_log("获取CP[".$this->str_cp_id."]配置[message_ftp_import_enable]amqp消息注入开关关闭", $this->str_cp_id);
            return \m_config::return_data(1,"获取CP[".$this->str_cp_id."]配置[message_ftp_import_enable]amqp消息注入开关关闭");
        }

        //MQ Server连接信息设置/队列服务器IP地址
        $this->arr_host_config['host'] = isset($arr_cp_config['data_info']['nns_config']['message_queue_import_host']) ? $arr_cp_config['data_info']['nns_config']['message_queue_import_host'] : '';
        //队列端口
        $this->arr_host_config['port'] = (isset($arr_cp_config['data_info']['nns_config']['message_queue_import_port']) && strlen($arr_cp_config['data_info']['nns_config']['message_queue_import_port']) >0 )? $arr_cp_config['data_info']['nns_config']['message_queue_import_port'] : 5672;
        //队列连接账户名
        $this->arr_host_config['login'] = isset($arr_cp_config['data_info']['nns_config']['message_queue_import_user']) ? $arr_cp_config['data_info']['nns_config']['message_queue_import_user'] : '';
        //队列连接账户密码
        $this->arr_host_config['password'] = isset($arr_cp_config['data_info']['nns_config']['message_queue_import_pass']) ? $arr_cp_config['data_info']['nns_config']['message_queue_import_pass'] : '';
        //路径
        $this->arr_host_config['vhost'] = isset($arr_cp_config['data_info']['nns_config']['message_queue_import_vhost']) ? $arr_cp_config['data_info']['nns_config']['message_queue_import_vhost'] : '';
        //路由名
        $this->arr_host_config['channel_name'] = isset($arr_cp_config['data_info']['nns_config']['message_queue_import_channel']) ? $arr_cp_config['data_info']['nns_config']['message_queue_import_channel'] : '';
        //交换名
        $this->arr_host_config['exchange_name'] = isset($arr_cp_config['data_info']['nns_config']['message_queue_import_exchange']) ? $arr_cp_config['data_info']['nns_config']['message_queue_import_exchange'] : '';
        //消息数据格式
        $this->arr_host_config['queue_data_format'] = isset($arr_cp_config['data_info']['nns_config']['message_queue_data_format']) ? $arr_cp_config['data_info']['nns_config']['message_queue_data_format'] : '';
        //消息条数
        $queue_num = isset($arr_cp_config['data_info']['nns_config']['message_queue_import_num']) ? (int)$arr_cp_config['data_info']['nns_config']['message_queue_import_num'] : 100;

        if(empty($this->arr_host_config) || empty($this->arr_host_config))
        {
            \m_config::write_message_receive_log("没有AMQP相关配置为:".var_export($this->arr_host_config,true), $this->str_cp_id);
            return \m_config::return_data(1,"获取CP[".$this->str_cp_id."]没有AMQP相关配置为:" . var_export($this->arr_host_config,true));
        }
        if(!isset($this->arr_host_config['host']) && strlen($this->arr_host_config['host']) <1)
        {
            \m_config::write_message_receive_log("队列服务器host地址配置为空:".$this->arr_host_config['port'], $this->str_cp_id);
            return \m_config::return_data(1,"获取CP[".$this->str_cp_id."]配置[message_queue_import_host]队列服务器host地址配置为空:".$this->arr_host_config['port']);
        }
        if(!isset($this->arr_host_config['port']) && strlen($this->arr_host_config['port']) <1)
        {
            \m_config::write_message_receive_log("队列服务器port地址配置为空:".$this->arr_host_config['port'], $this->str_cp_id);
            return \m_config::return_data(1,"获取CP[".$this->str_cp_id."]配置[message_queue_import_port]队列服务器port地址配置为空:".$this->arr_host_config['port']);
        }
        if(!isset($this->arr_host_config['channel_name']) && strlen($this->arr_host_config['channel_name']) <1)
        {
            \m_config::write_message_receive_log("队列服务器路由名配置为空:".$this->arr_host_config['channel_name'], $this->str_cp_id);
            return \m_config::return_data(1,"获取CP[".$this->str_cp_id."]配置[message_queue_import_channel]队列服务器路由名配置为空:".$this->arr_host_config['channel_name']);
        }
        $conn = new AMQPConnection($this->arr_host_config);
        $conn_result = $conn->connect();
        if (!$conn_result)
        {
            \m_config::write_message_receive_log("AMQP服务器不能正常连接:".var_export($this->arr_host_config,true), $this->str_cp_id);
            return \m_config::return_data(1,"获取CP[".$this->str_cp_id."]配置[message_ftp_import_enable]amqp消息注入开关关闭");
        }
        //创建虚拟路由对象
        $channel = new AMQPChannel($conn);
        //创建队列对象
        $q = new AMQPQueue($channel);
        $q->setName($this->arr_host_config['channel_name']);
        $q->setFlags(AMQP_DURABLE);
        \m_config::write_message_receive_log("-------------开始获取消息--------------", $this->str_cp_id);
        $queue_now_num = 0;

        while (($messages = $q->get()) && ($queue_now_num < $queue_num))
        {
            if ($messages===FALSE || $messages === null)
            {
                \m_config::write_message_receive_log("消息体有问题，退出获取", $this->str_cp_id);
                break;
            }
            ++$queue_now_num;
            # 获取XML
            $str_data = htmlspecialchars_decode($messages->getBody(), ENT_QUOTES);
            \m_config::write_message_receive_log("获取的第{$queue_now_num}条数据xml为：".var_export($str_data,true), $this->str_cp_id);
            if($this->arr_host_config['queue_data_format'] == 'json')
            {
                $arr_data = json_decode($str_data,true);
                $arr_data['contentformat'] = 'json';
                $str_data = self::arr2xml($arr_data,null);
            }
            \m_config::write_message_receive_log("------------开始注入-----------", $this->str_cp_id);
            $result = $this->import_message($str_data);
            \m_config::write_message_receive_log("------------注入结果-----------", $this->str_cp_id);
            if(!$result)
            {
                \m_config::write_message_receive_log("注入入库失败", $this->str_cp_id);
            }
            else
            {
                \m_config::write_message_receive_log("注入成功", $this->str_cp_id);
                //反馈消息体
                $q->ack($messages->getDeliveryTag());
            }

            \m_config::write_message_receive_log("执行下一条", $this->str_cp_id);
        }
        return true;
    }

    /**
     * 将数组转换成xml
     * @param $arr
     * @param $node
     * @return mixed
     */
    public function arr2xml($arr,$node)
    {
        if($node === null) {
            $simxml = new simpleXMLElement('<?xml version="1.0" encoding="utf-8" ?><assetcontent></assetcontent>');
        } else {
            $simxml = $node;
        }
        foreach($arr as $k=>$v) {
            $k = strtolower($k);
            if(is_array($v)) {
                self::arr2xml($v,$simxml->addChild('items'.$k));
            } else if(is_numeric($k)) {
                $simxml->addChild('item',$v);
            } else {
                $simxml->addChild($k,$v);
            }
        }
        return $simxml->saveXML();
    }

    /**
     * 将消息封装入库
     * @param $params
     * @return bool
     */
    public function import_message($params)
    {
        $xml_obj = simplexml_load_string($params);

        if(!$xml_obj)
        {
            \m_config::write_message_receive_log("内容XML解析错误:".var_export($params,true), $this->str_cp_id);
            return false;
        }

        if((string)$xml_obj->contentformat[0] == 'json')
        {
            //获取操作(新增,删除)1增加修改; 2 删除;
            $this->arr_in_message['base_info']['nns_action'] = (int)$xml_obj->assetoperation[0];
            //1 主媒资 2分集 3片源
            $this->arr_in_message['base_info']['nns_type'] = (int)$xml_obj->assetdesc[0];
            $this->arr_in_message['base_info']['nns_message_content'] = $params;
            //消息id
            $this->arr_in_message['base_info']['nns_message_id'] = (string)$xml_obj->info->pushcode[0];

        }
        else
        {

            //获取操作(新增,删除)1增加修改; 2 删除;
            $this->arr_in_message['base_info']['nns_action'] = (int)$xml_obj->assetoperation[0];
            //1 主媒资 2分集 3片源
            $this->arr_in_message['base_info']['nns_type'] = (int)$xml_obj->assetdesc[0];
            $this->arr_in_message['base_info']['nns_message_content'] = $params;
            //消息id
            $this->arr_in_message['base_info']['nns_message_id'] = (string)$xml_obj->info->pushcode[0];

            if($this->arr_in_message['base_info']['nns_type'] == 1)
            {
                $this->arr_in_message['base_info']['nns_name'] = (string)$xml_obj->content->clipname[0];
            }
            elseif ($this->arr_in_message['base_info']['nns_type'] == 2)
            {
                $this->arr_in_message['base_info']['nns_name'] = (string)$xml_obj->content->partname[0];
            }
            else
            {
                $this->arr_in_message['base_info']['nns_name'] = '';
            }
        }
        //读取毫秒级时间
        list ( $usec, $sec ) = explode ( ' ', microtime () );
        $time = str_pad(intval(substr($usec, 2, 4)),4,'0',STR_PAD_LEFT);
        $this->arr_in_message['base_info']['nns_message_time'] = date('YmdHis', time()) . $time;
        $this->arr_in_message['nns_message_xml'] = '';
        $result_message_add = $this->push($this->arr_in_message);
        if($result_message_add['ret'] !=0)
        {
            \m_config::write_message_receive_log("入库消息失败:".var_export($result_message_add,true), $this->str_cp_id);
            return false;
        }

        return true;
    }
}
$amqp_import_v2 = new amqp_import();
$result = $amqp_import_v2->import();
if($result)
{
    echo 1;
}
else
{
    echo 0;
}
